/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.trace;

import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
import static org.apache.phoenix.metrics.MetricInfo.END;
import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
import static org.apache.phoenix.metrics.MetricInfo.PARENT;
import static org.apache.phoenix.metrics.MetricInfo.SPAN;
import static org.apache.phoenix.metrics.MetricInfo.START;
import static org.apache.phoenix.metrics.MetricInfo.TAG;
import static org.apache.phoenix.metrics.MetricInfo.TRACE;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;

/**
 * Write the metrics to a phoenix table. Generally, this class is instantiated via hadoop-metrics2
 * property files. Specifically, you would create this class by adding the following to by This
 * would actually be set as: <code>
 * [prefix].sink.[some instance name].class=org.apache.phoenix.trace.PhoenixMetricsSink
 * </code>, where <tt>prefix</tt> is either:
 * <ol>
 * <li>"phoenix", for the client</li>
 * <li>"hbase", for the server</li>
 * </ol>
 * and <tt>some instance name</tt> is just any unique name, so properties can be differentiated if
 * there are multiple sinks of the same type created
 */
public class PhoenixMetricsSink implements MetricsSink {

  private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMetricsSink.class);

  private static final String VARIABLE_VALUE = "?";

  private static final Joiner COLUMN_JOIN = Joiner.on(".");
  static final String TAG_FAMILY = "tags";
  /**
   * Count of the number of tags we are storing for this row
   */
  static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count");

  static final String ANNOTATION_FAMILY = "annotations";
  static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count");

  /**
   * Join strings on a comma
   */
  private static final Joiner COMMAS = Joiner.on(',');

  private Connection conn;

  private String table;

  public PhoenixMetricsSink() {
    LOGGER.info("Writing tracing metrics to phoenix table");

  }

  @Override
  public void init(SubsetConfiguration config) {
    Metrics.markSinkInitialized();
    LOGGER.info("Phoenix tracing writer started");
  }

  /**
   * Initialize <tt>this</tt> only when we need it
   */
  private void lazyInitialize() {
    synchronized (this) {
      if (this.conn != null) {
        return;
      }
      try {
        // create the phoenix connection
        Properties props = new Properties();
        props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, Tracing.Frequency.NEVER.getKey());
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        Connection conn = QueryUtil.getConnectionOnServer(props, conf);
        // enable bulk loading when we have enough data
        conn.setAutoCommit(true);

        String tableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,
          QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME);

        initializeInternal(conn, tableName);
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  }

  private void initializeInternal(Connection conn, String tableName) throws SQLException {
    this.conn = conn;
    // ensure that the target table already exists
    if (!traceTableExists(conn, tableName)) {
      createTable(conn, tableName);
    }
    this.table = tableName;
  }

  private boolean traceTableExists(Connection conn, String traceTableName) throws SQLException {
    try {
      conn.unwrap(PhoenixConnection.class).getTable(traceTableName);
      return true;
    } catch (TableNotFoundException e) {
      return false;
    }
  }

  /**
   * Used for <b>TESTING ONLY</b> Initialize the connection and setup the table to use the
   * {@link org.apache.phoenix.query.QueryServicesOptions#DEFAULT_TRACING_STATS_TABLE_NAME}
   * @param conn      to store for upserts and to create the table (if necessary)
   * @param tableName TODO
   * @throws SQLException if any phoenix operation fails
   */
  @VisibleForTesting
  public void initForTesting(Connection conn, String tableName) throws SQLException {
    initializeInternal(conn, tableName);
  }

  /**
   * Create a stats table with the given name. Stores the name for use later when creating upsert
   * statements
   * @param conn  connection to use when creating the table
   * @param table name of the table to create
   * @throws SQLException if any phoenix operations fails
   */
  private void createTable(Connection conn, String table) throws SQLException {
    // only primary-key columns can be marked non-null
    String ddl = "create table if not exists " + table + "( " + TRACE.columnName
      + " bigint not null, " + PARENT.columnName + " bigint not null, " + SPAN.columnName
      + " bigint not null, " + DESCRIPTION.columnName + " varchar, " + START.columnName
      + " bigint, " + END.columnName + " bigint, " + HOSTNAME.columnName + " varchar, " + TAG_COUNT
      + " smallint, " + ANNOTATION_COUNT + " smallint" + "  CONSTRAINT pk PRIMARY KEY ("
      + TRACE.columnName + ", " + PARENT.columnName + ", " + SPAN.columnName + "))\n" +
      // We have a config parameter that can be set so that tables are
      // transactional by default. If that's set, we still don't want these system
      // tables created as transactional tables, make these table non
      // transactional
      PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
    try (PreparedStatement stmt = conn.prepareStatement(ddl)) {
      stmt.execute();
    }
  }

  @Override
  public void flush() {
    try {
      this.conn.commit();
    } catch (SQLException e) {
      LOGGER.error("Failed to commit changes to table", e);
    }
  }

  /**
   * Add a new metric record to be written.
   */
  @Override
  public void putMetrics(MetricsRecord record) {
    // its not a tracing record, we are done. This could also be handled by filters, but safer
    // to do it here, in case it gets misconfigured
    if (!record.name().startsWith(TracingUtils.METRIC_SOURCE_KEY)) {
      return;
    }

    // don't initialize until we actually have something to write
    lazyInitialize();

    String stmt = "UPSERT INTO " + table + " (";
    // drop it into the queue of things that should be written
    List<String> keys = new ArrayList<String>();
    List<Object> values = new ArrayList<Object>();
    // we need to keep variable values in a separate set since they may have spaces, which
    // causes the parser to barf. Instead, we need to add them after the statement is prepared
    List<String> variableValues = new ArrayList<String>(record.tags().size());
    keys.add(TRACE.columnName);
    values.add(Long.parseLong(record.name().substring(TracingUtils.METRIC_SOURCE_KEY.length())));

    keys.add(DESCRIPTION.columnName);
    values.add(VARIABLE_VALUE);
    variableValues.add(record.description());

    // add each of the metrics
    for (AbstractMetric metric : record.metrics()) {
      // name of the metric is also the column name to which we write
      keys.add(MetricInfo.getColumnName(metric.name()));
      values.add(metric.value());
    }

    // get the tags out so we can set them later (otherwise, need to be a single value)
    int annotationCount = 0;
    int tagCount = 0;
    for (MetricsTag tag : record.tags()) {
      if (tag.name().equals(ANNOTATION.traceName)) {
        addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, tag, ANNOTATION,
          annotationCount);
        annotationCount++;
      } else if (tag.name().equals(TAG.traceName)) {
        addDynamicEntry(keys, values, variableValues, TAG_FAMILY, tag, TAG, tagCount);
        tagCount++;
      } else if (tag.name().equals(HOSTNAME.traceName)) {
        keys.add(HOSTNAME.columnName);
        values.add(VARIABLE_VALUE);
        variableValues.add(tag.value());
      } else if (tag.name().equals("Context")) {
        // ignored
      } else {
        LOGGER.error("Got an unexpected tag: " + tag);
      }
    }

    // add the tag count, now that we know it
    keys.add(TAG_COUNT);
    // ignore the hostname in the tags, if we know it
    values.add(tagCount);

    keys.add(ANNOTATION_COUNT);
    values.add(annotationCount);

    // compile the statement together
    stmt += COMMAS.join(keys);
    stmt += ") VALUES (" + COMMAS.join(values) + ")";

    if (LOGGER.isTraceEnabled()) {
      LOGGER.trace("Logging metrics to phoenix table via: " + stmt);
      LOGGER.trace("With tags: " + variableValues);
    }
    try (PreparedStatement ps = conn.prepareStatement(stmt)) {
      // add everything that wouldn't/may not parse
      int index = 1;
      for (String tag : variableValues) {
        ps.setString(index++, tag);
      }
      // Not going through the standard route of using statement.execute() as that code path
      // is blocked if the metadata hasn't been been upgraded to the new minor release.
      MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt);
      MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
      MutationState newState = plan.execute();
      state.join(newState);
    } catch (SQLException e) {
      LOGGER.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt, e);
    }
  }

  public static String getDynamicColumnName(String family, String column, int count) {
    return COLUMN_JOIN.join(family, column) + count;
  }

  private void addDynamicEntry(List<String> keys, List<Object> values, List<String> variableValues,
    String family, MetricsTag tag, MetricInfo metric, int count) {
    // <family><.dynColumn><count> <VARCHAR>
    keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR");

    // build the annotation value
    String val = tag.description() + " - " + tag.value();
    values.add(VARIABLE_VALUE);
    variableValues.add(val);
  }

  @VisibleForTesting
  public void clearForTesting() throws SQLException {
    this.conn.rollback();
  }
}
